Near Real-time Taxicab Driver Availability Analytics

Description: This notebook contains the code and plots depicting the various analytics that can be performed on the output generated from consumer.py The notebook is well commented to explain each section and can be updated to accomodate future requirements. @authors: Shyamal Akruvala, Amit Kumar version 1.1

Initialization

Please update the ROOT_DIR parameter in the below cell. This root directory should be the same where the consumer.py is producing the output files.
In [1]:
# Importing needed libraries and setting up the parameters
import os
import pathlib
import pandas as pd
from glob import glob
from global_land_mask import globe
import plotly.express as px

ROOT_DIR = pathlib.Path(r"C:\Sparkathon") # The path needs to match where the consumer.py is producing the output files.
print("Initialization is completed. The directory where files are expected is: " + str(ROOT_DIR))
Initialization is completed. The directory where files are expected is: C:\Sparkathon

Raw Driver Data Analytics

Raw incoming driver data from the kafka topic is captured and written to disk by consumer.py Using this data we can get some insights which have been listed below.

In [2]:
# Creating a dataframe from all the csv files available under the raw directory. 
PATH_RAW = ROOT_DIR / 'data/raw'
EXT='*.csv'
all_csv_raw_files = [file for path, subdir, files in os.walk(PATH_RAW) for file in glob(os.path.join(path, EXT))]
l_raw = [pd.read_csv(filename, header=None) for filename in all_csv_raw_files]
df_raw = pd.concat(l_raw, axis=0, sort=False)
colnames=['ts_minute','epoch_time','driver_id','on_duty','location'] 
df_raw.columns = colnames
#print(df_raw.shape)
print("Dataframe from raw data successfully created.")
Dataframe from raw data successfully created.
In [3]:
# Validate the data types of the columns in the dataframe to ensure consistency
df_raw['ts_minute'] = pd.to_datetime(df_raw['ts_minute'])
df_raw['driver_id'] = df_raw['driver_id'].astype(int)
df_raw['on_duty'] = df_raw['on_duty'].astype(int)
#df_raw.head()
print("The datatypes are all valid.")
The datatypes are all valid.

Driver Availability Chart

In [4]:
# Code to identify and plot the driver availability on a per minute basis.
# Raw Driver Group
minute_group_good_records = df_raw[(df_raw['driver_id'] > 0) 
                               & (df_raw['on_duty'].isin([0,1]))].groupby(['ts_minute'
                                                                       ,'on_duty']).agg({'driver_id':'count'}).reset_index()

fig = px.line(minute_group_good_records, x="ts_minute", y="driver_id", color='on_duty', 
              color_discrete_sequence=["red", "green"])
# Edit the layout
fig.update_layout(title='Near Real Time Driver Availability (per minute)'
                  ,xaxis_title='Minutes'
                  ,yaxis_title='Record Count'
                  ,font_size=16
                  ,legend_title_text='On Duty'
                 )
fig.show()

Geo-Spatial View of Driver Availability

In [5]:
# Sample map data preparation
_df_raw = df_raw.sample(500)
# Expand location >> lat, lon columns
_df_raw = _df_raw.join(_df_raw['location'].str.split(' ', 1, expand=True).rename(columns={0:'lat', 1:'lon'})).copy()

# Validate location is_land or not
_df_raw['is_land'] = _df_raw[['lat', 'lon']].apply(lambda x: globe.is_land(float(x[0]), float(x[1])), axis=1)
_df_raw = _df_raw[_df_raw['is_land']==True].copy()

_df_raw['lat'] = _df_raw['lat'].astype(float)
_df_raw['lon'] = _df_raw['lon'].astype(float)
_df_raw['lat_r'] = _df_raw['lat'].astype(float).round(1)
_df_raw['lon_r'] = _df_raw['lon'].astype(float).round(1)

# Derived Columns
## df_raw['location_r'] = df_raw[['lat_r', 'lon_r']].apply(lambda x: str(x[0]) + ' ' + str(x[1]), axis=1)
_df_raw['hour'] = _df_raw.ts_minute.dt.hour
_df_raw['minute'] = _df_raw.ts_minute.dt.minute
In [6]:
# plotting the map with the availability data
fig = px.scatter_mapbox(_df_raw, lat="lat", lon="lon"
                        ,hover_name="driver_id"
                        ,color_discrete_sequence=["fuchsia"]
                        ,zoom=4
                        ,height=300)
fig.update_layout(mapbox_style="open-street-map")
fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
# fig.update_layout(title='Driver Geo Spatial View',font_size=16)
fig.show()

Audit Trail Analytics

This section analyzes the data coming in from the kafka stream on a per minute basis to calculate the below metrics,

  • Received: Total received records per minute
  • Processed: Valid records those are processed
  • Deleted: Records either having NULL or not erroneous ones are deleted while transformation
In [12]:
# Creating a dataframe from all the csv files available under the audit directory. 
PATH_AUDIT = ROOT_DIR / 'Data/audit'
EXT = "*.csv"
COL_NAMES = ['ts_minute', 'received', 'processed', 'deleted']

all_csv_audit_files = [file for path, subdir, files in os.walk(PATH_AUDIT) for file in glob(os.path.join(path, EXT))]
#print(all_csv_audit_files)
l_audit = [pd.read_csv(filename, header=None) for filename in all_csv_audit_files]
df_audit = pd.concat(l_audit, axis=0)
df_audit.columns = COL_NAMES
# df_audit=df_audit[df_audit['total_record_received'] > 10]
# print(df_audit.shape)
print("Dataframe from audit data successfully created.")
Dataframe from audit data successfully created.
In [8]:
# Plot the audit data
fig = px.bar(df_audit, x="ts_minute"
              ,y=["received","processed"]
              ,color_discrete_sequence=["yellow", "green"])
# Edit the layout
fig.update_layout(title='Audit Trail (Stacked Bar View)'
                  ,xaxis_title='Minutes'
                  ,yaxis_title='Record Count'
                  ,font_size=16
                  ,legend_title_text='Stream Records'
                 )
fig.show()

Online Driver View

This section analyzes the data to depict the drivers available for hire.

  • Online: Total online drivers
  • Available: Total available drivers drivers to take the customer ride call out of all online drivers
In [9]:
# Creating a dataframe from all the csv files available under the online directory. 
PATH_ONLINE = ROOT_DIR / 'Data/online'
EXT = "*.csv"
COL_NAMES = ['ts_minute','online','available']
all_csv_online_files = [file for path, subdir, files in os.walk(PATH_ONLINE) for file in glob(os.path.join(path, EXT))]
l_online = [pd.read_csv(filename, header=None) for filename in all_csv_online_files]
df_online = pd.concat(l_online, axis=0)
df_online.columns = COL_NAMES
df_online = df_online[df_online['available'] >=0]
#print(df_online.shape)
print("Dataframe from online data successfully created.")
Dataframe from online data successfully created.
In [10]:
# Plot the online vs available driver view
fig = px.bar(df_online, x="ts_minute"
              ,y=["online","available"]
              ,color_discrete_sequence=["yellow", "green"]
             )
# Edit the layout
fig.update_layout(title='Online Driver Availability'
                  ,xaxis_title='Minutes'
                  ,yaxis_title='Record Count'
                  ,font_size=16
                  ,legend_title_text='Driver Availability'
                 )
fig.show()